package com.innovation.ic.b1b.monitor.base.handler;

import com.google.common.base.Strings;
import com.innovation.ic.b1b.monitor.base.mapper.KafkaAvailableJobLogMapper;
import com.innovation.ic.b1b.monitor.base.mapper.KafkaAvailableJobMapper;
import com.innovation.ic.b1b.monitor.base.mapper.KafkaMapper;
import com.innovation.ic.b1b.monitor.base.model.Kafka;
import com.innovation.ic.b1b.monitor.base.model.KafkaAvailableJob;
import com.innovation.ic.b1b.monitor.base.model.KafkaAvailableJobLog;
import com.innovation.ic.b1b.monitor.base.pojo.constant.JobDataMapConstant;
import com.innovation.ic.b1b.monitor.base.pojo.enums.ActiveEnum;
import com.innovation.ic.b1b.monitor.base.value.TimeSetConfig;
import lombok.SneakyThrows;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.common.KafkaFuture;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;
import javax.annotation.Resource;
import java.sql.Date;
import java.util.Properties;
import java.util.Random;
import java.util.Set;

/**
 * @desc   kafka可用任务处理类
 * @author linuo
 * @time   2023年5月8日13:34:56
 */
public class KafkaAvailableJobHandler extends QuartzJobBean {
    private Logger log = LoggerFactory.getLogger(this.getClass());

    @Resource
    private KafkaAvailableJobMapper kafkaAvailableJobMapper;

    @Resource
    private KafkaAvailableJobLogMapper kafkaAvailableJobLogMapper;

    @Resource
    private KafkaMapper kafkaMapper;

    @Resource
    private HandlerHelper handlerHelper;

    @Resource
    private TimeSetConfig timeSetConfig;

    @SneakyThrows
    @Override
    protected void executeInternal(JobExecutionContext jobExecutionContext) {
        log.info("开始执行kafka可用任务处理");

        // 获取1-5间的随机数
        int min = 1;
        int max = 5;
        Random random = new Random();
        int s = random.nextInt(max) % (max - min + 1) + min;
        int sleepTime = s * 1000;
        log.info("休眠[{}]秒", s);
        Thread.sleep(sleepTime);

        JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
        int id = jobDataMap.getInt(JobDataMapConstant.ID);
        KafkaAvailableJob kafkaAvailableJob = kafkaAvailableJobMapper.selectById(id);
        if(kafkaAvailableJob != null) {
            Kafka kafka = kafkaMapper.selectById(kafkaAvailableJob.getKafkaId());
            if (kafka != null) {
                String kafkaName = kafka.getName();

                // 存活状态
                Integer active = ActiveEnum.NO.getCode();

                KafkaAvailableJobLog kafkaAvailableJobLog = new KafkaAvailableJobLog();
                kafkaAvailableJobLog.setKafkaAvailableJobId(id);
                kafkaAvailableJobLog.setStartTime(new Date(System.currentTimeMillis()));

                if(Strings.isNullOrEmpty(kafka.getIp()) || Strings.isNullOrEmpty(kafka.getPort())){
                    String message = kafkaName + "配置信息不完整,无法检测是否存活";
                    log.info(message);
                    kafkaAvailableJobLog.setDescription(message);
                }else{
                    // 判断kafka是否可用
                    String message = judgeKafkaIfActive(kafka);
                    if(message == null){
                        active = ActiveEnum.YES.getCode();
                    }else{
                        kafkaAvailableJobLog.setDescription(message);
                    }
                }

                kafkaAvailableJobLog.setActive(active);
                int insert = kafkaAvailableJobLogMapper.insert(kafkaAvailableJobLog);
                if(insert > 0){
                    log.info("mongodb可用任务日志插入成功");

                    if(active.intValue() == ActiveEnum.NO.getCode().intValue()){
                        // 发送邮件
                        String message = kafkaName + "出现异常，原因：" + kafkaAvailableJobLog.getDescription() + "，请查看kafka状态并及时进行异常处理。";
                        handlerHelper.sendEmail(kafkaAvailableJob.getAlarmEmail(), message, null);
                    }
                }
            }else{
                log.info("未在kafka表中查询到id=[{}]的配置信息,无法执行监控kafka可用任务", kafkaAvailableJob.getKafkaId());
            }
        }else{
            log.info("监控kafka可用任务id为[{}]的数据不存在,无法执行任务", id);
        }
    }

    /**
     * 判断kafka是否可用
     * @param kafka kafka配置信息
     * @return 返回判断结果
     */
    private String judgeKafkaIfActive(Kafka kafka) throws InterruptedException {
        String message = null;

        int requestTimeout = 30000;
        int connectMaxTimeout = 10000;
        String server = kafka.getIp() + ":" + kafka.getPort();
        AdminClient client = null;
        String bootstrapServers = "bootstrap.servers";
        String connectionsMs = "connections.max.idle.ms";
        String requestMs = "request.timeout.ms";

        try {
            log.info("第一次执行监控kafka是否可用任务");
            Properties properties = new Properties();
            properties.put(bootstrapServers, server);
            properties.put(connectionsMs, connectMaxTimeout);
            properties.put(requestMs, requestTimeout);
            client = KafkaAdminClient.create(properties);
            ListTopicsResult topics = client.listTopics(new ListTopicsOptions().timeoutMs(requestTimeout));
            KafkaFuture<Set<String>> names1 = topics.names();
            Set<String> names = names1.get();
            log.info("当前kafka的topics为[{}]", names);
            log.info("kafka连接成功");
            client.close();
        }catch (Exception e){
            if(client != null){
                client.close();
                client = null;
            }

            log.warn("监控kafka是否可用任务出现问题,原因:[{}],待{}秒后重试", e.toString(), timeSetConfig.getRetryWaitTime());
            Thread.sleep(timeSetConfig.getRetryWaitTime() * 1000);

            try {
                log.info("第二次执行监控kafka是否可用任务");
                Properties properties = new Properties();
                properties.put(bootstrapServers, server);
                properties.put(connectionsMs, connectMaxTimeout);
                properties.put(requestMs, requestTimeout);
                client = KafkaAdminClient.create(properties);
                ListTopicsResult topics = client.listTopics(new ListTopicsOptions().timeoutMs(requestTimeout));
                Set<String> names = topics.names().get();
                log.info("当前kafka的topics为[{}]", names);
                log.info("kafka连接成功");
            }catch (Exception e1) {
                if(client != null){
                    client.close();
                    client = null;
                }

                log.warn("监控kafka是否可用任务出现问题,原因:[{}],待{}秒后重试", e1.toString(), timeSetConfig.getRetryWaitTime());
                Thread.sleep(timeSetConfig.getRetryWaitTime() * 1000);

                try {
                    log.info("第三次执行监控kafka是否可用任务");
                    Properties properties = new Properties();
                    properties.put(bootstrapServers, server);
                    properties.put(connectionsMs, connectMaxTimeout);
                    properties.put(requestMs, requestTimeout);
                    client = KafkaAdminClient.create(properties);
                    ListTopicsResult topics = client.listTopics(new ListTopicsOptions().timeoutMs(requestTimeout));
                    Set<String> names = topics.names().get();
                    log.info("当前kafka的topics为[{}]", names);
                    log.info("kafka连接成功");
                }catch (Exception e2) {
                    if(client != null){
                        client.close();
                        client = null;
                    }

                    log.warn("监控kafka是否可用任务出现问题,原因:[{}],kafka连接失败", e2.toString());
                    message = e2.toString();
                }
            }
        }finally {
            if(client != null){
                client.close();
                client = null;
            }
        }

        if(!Strings.isNullOrEmpty(message)){
            if(message.length() > 100){
                message = message.substring(0, 100);
            }
        }

        return message;
    }
}